Skip to content

oplogPopulator: synthesise oplog key via $addFields in connector pipeline#2744

Open
delthas wants to merge 2 commits into
development/9.4from
improvement/BB-768/oplog-key-via-pipeline-addfields
Open

oplogPopulator: synthesise oplog key via $addFields in connector pipeline#2744
delthas wants to merge 2 commits into
development/9.4from
improvement/BB-768/oplog-key-via-pipeline-addfields

Conversation

@delthas
Copy link
Copy Markdown
Contributor

@delthas delthas commented May 29, 2026

Summary

Two commits on this branch:

  1. oplogPopulator: synthesise oplog key via $addFields in connector pipeline
    • extensions/oplogPopulator/pipeline/PipelineFactory.js: insert a $addFields stage into the connector's change-stream pipeline (right after $match, before the conditional location-strip $set) that synthesises a top-level key field via $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key.
    • extensions/oplogPopulator/constants.js: replace the broken fullDocument nested record in output.schema.key with a top-level key: [string, null] field. The existing ns: {coll} projection is preserved, so the Kafka message key remains {ns: {coll: <bucket>}, key: <object-key>} — bucket-level isolation is unchanged.
    • Tests updated in both pipeline-factory subclasses, the functional pipeline test (real-mongo aggregation against the new stage), and the connectorConfig fixture used by ConnectorsManager tests.
  2. queuePopulator: counter for oplog events processed without a synthesised key
    • New lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js module (mirrors LifecycleMetrics: static class, per-method try/catch + handleError).
    • Counter s3_oplog_event_missing_key_total labelled by opType, incremented from ListRecordStream._transform when a consumed event passes the objectMd check but lacks the top-level key. Defensive observability — added per review.

Pure Backbeat change. No SMT, no Zenko image change, no operator flag. Always on.

Context

BB-768: the current key schema projects fullDocument.value.key, which is null on update events (since BB-355 removed change.stream.full.document=updateLookup). Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition — both an ordering problem (insert → update on different partitions for the same object) and a throughput problem (a hot bucket's update traffic can't scale with partition count, blocking BB-756).

After ticket discussion (Jira comment 477122) we picked the pipeline-$addFields route over a Kafka Connect SMT: it's a single-repo fix, and the measured server-side cost is ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s — not a throughput concern.

For the same logical S3 object — insert (key in fullDocument.value.key), master PUT update (key in updatedFields.value.key), replication-status update, delete-marker update — all yield the same key value → same partition. Master and version documents both store the same value.key, so master/version events also collapse to the same partition without prefix-stripping.

Coupling caveat

The $ifNull variant relies on metadata always writing the whole value subdocument (full $set). Confirmed today against arsenal — there's no partial dotted-$set path for object MD. A future partial update ($set: {"value.x": …}) would not populate updatedFields.value.key and the resulting event would mis-partition. Accepted risk per the ticket discussion. The s3_oplog_event_missing_key_total counter exists to detect exactly this regression.

Migration

The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + the added $addFields stage only, not the $match stage). Downstream oplog consumers don't read the Kafka message key, so the new key shape is consumer-transparent. See this comment for the full migration trace.

Issue: BB-768

@bert-e
Copy link
Copy Markdown
Contributor

bert-e commented May 29, 2026

Hello delthas,

My role is to assist you with the merge of this
pull request. Please type @bert-e help to get information
on this process, or consult the user documentation.

Available options
name description privileged authored
/after_pull_request Wait for the given pull request id to be merged before continuing with the current one.
/bypass_author_approval Bypass the pull request author's approval
/bypass_build_status Bypass the build and test status
/bypass_commit_size Bypass the check on the size of the changeset TBA
/bypass_incompatible_branch Bypass the check on the source branch prefix
/bypass_jira_check Bypass the Jira issue check
/bypass_peer_approval Bypass the pull request peers' approval
/bypass_leader_approval Bypass the pull request leaders' approval
/approve Instruct Bert-E that the author has approved the pull request. ✍️
/create_pull_requests Allow the creation of integration pull requests.
/create_integration_branches Allow the creation of integration branches.
/no_octopus Prevent Wall-E from doing any octopus merge and use multiple consecutive merge instead
/unanimity Change review acceptance criteria from one reviewer at least to all reviewers
/wait Instruct Bert-E not to run until further notice.
Available commands
name description privileged
/help Print Bert-E's manual in the pull request.
/status Print Bert-E's current status in the pull request TBA
/clear Remove all comments from Bert-E from the history TBA
/retry Re-start a fresh build TBA
/build Re-start a fresh build TBA
/force_reset Delete integration branches & pull requests, and restart merge process from the beginning.
/reset Try to remove integration branches unless there are commits on them which do not appear on the source branch.

Status report is not available.

@bert-e
Copy link
Copy Markdown
Contributor

bert-e commented May 29, 2026

Waiting for approval

The following approvals are needed before I can proceed with the merge:

  • the author

  • 2 peers

@codecov
Copy link
Copy Markdown

codecov Bot commented May 29, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.60%. Comparing base (c52fbcc) to head (f53649e).

Additional details and impacted files

Impacted file tree graph

Files with missing lines Coverage Δ
extensions/oplogPopulator/constants.js 100.00% <ø> (ø)
...ensions/oplogPopulator/pipeline/PipelineFactory.js 100.00% <ø> (ø)
...ulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js 100.00% <100.00%> (ø)
...ueuePopulator/KafkaLogConsumer/ListRecordStream.js 100.00% <100.00%> (ø)

... and 8 files with indirect coverage changes

Components Coverage Δ
Bucket Notification 80.22% <ø> (ø)
Core Library 80.82% <100.00%> (-0.16%) ⬇️
Ingestion 70.63% <ø> (-0.61%) ⬇️
Lifecycle 79.06% <ø> (ø)
Oplog Populator 85.83% <ø> (ø)
Replication 59.71% <ø> (-0.08%) ⬇️
Bucket Scanner 85.76% <ø> (ø)
@@                 Coverage Diff                 @@
##           development/9.5    #2744      +/-   ##
===================================================
- Coverage            74.73%   74.60%   -0.13%     
===================================================
  Files                  199      200       +1     
  Lines                13650    13661      +11     
===================================================
- Hits                 10201    10192       -9     
- Misses                3439     3459      +20     
  Partials                10       10              
Flag Coverage Δ
api:retry 9.12% <0.00%> (-0.01%) ⬇️
api:routes 8.94% <0.00%> (-0.01%) ⬇️
bucket-scanner 85.76% <ø> (ø)
ft_test:queuepopulator 9.10% <36.36%> (-1.02%) ⬇️
ingestion 12.53% <36.36%> (-0.05%) ⬇️
lib 7.74% <0.00%> (-0.05%) ⬇️
lifecycle 18.88% <0.00%> (-0.13%) ⬇️
notification 1.02% <0.00%> (-0.01%) ⬇️
oplogPopulator 0.14% <0.00%> (-0.01%) ⬇️
replication 18.71% <0.00%> (-0.01%) ⬇️
unit 51.27% <100.00%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@delthas delthas marked this pull request as ready for review May 29, 2026 10:27
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from 1193adb to 20249e5 Compare May 29, 2026 10:28
@scality scality deleted a comment from claude Bot May 29, 2026
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch 2 times, most recently from 4edd41b to e24ee0a Compare May 29, 2026 10:43
@scality scality deleted a comment from claude Bot May 29, 2026
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from e24ee0a to 37fb5a1 Compare May 29, 2026 10:47
@scality scality deleted a comment from claude Bot May 29, 2026
@delthas delthas requested review from a team, francoisferrand and maeldonn May 29, 2026 10:48
@scality scality deleted a comment from claude Bot May 29, 2026
Copy link
Copy Markdown
Contributor

@francoisferrand francoisferrand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the recently updated logic correctly update the pipeline in all cases?
or do we have some situations were we may still stay with an old pipeline?

Comment thread extensions/oplogPopulator/pipeline/PipelineFactory.js Outdated
Comment thread extensions/oplogPopulator/pipeline/PipelineFactory.js Outdated
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from 37fb5a1 to e0ce22c Compare May 29, 2026 13:07
@delthas
Copy link
Copy Markdown
Contributor Author

delthas commented May 29, 2026

will the recently updated logic correctly update the pipeline in all cases?

Yes, on the next Backbeat restart, no existing connector stays on the old pipeline:

  • OplogPopulator.initializeConnectorsConnectorsManager._processOldConnectors retrieves each existing connector's config from Kafka Connect, extracts buckets from pipeline[0].$match (works on both the old 1–2-stage shape and the new 2–3-stage shape — the bucket stage stays at index 0), then constructs a new Connector with {...oldConfig, ...config} where config = _getDefaultConnectorConfiguration(). The spread overwrites output.schema.key with the new single-key-field Avro, and the new Connector starts with bucketsGotModified: true.
  • On the first reconciliation tick after restart, Connector.updatePipeline(doUpdate=true) rebuilds _config.pipeline via the new PipelineFactory.getPipeline() and PUTs the full _config (new schema + new pipeline) via _kafkaConnect.updateConnectorConfig. That's an in-place PUT /connectors/{name}/config — no recreate, no resume-token loss (the change doesn't touch $match).
  • If that first PUT fails (network blip / Connect not ready), bucketsGotModified stays true and it retries on each subsequent tick until success.
  • Newly-spawned connectors (from addConnector) go through _getDefaultConnectorConfiguration directly, so they start with the new schema + new pipeline from the first POST.

The one path that doesn't flip in-process: an already-running Backbeat with old code in memory — the in-process Connector._config was constructed before this code was deployed, so it keeps the old schema until restart. Backbeat restart is required as part of the rollout (which is normal for any Backbeat upgrade).

Also addressed in e0ce22c9:

Comment thread extensions/oplogPopulator/constants.js
@delthas delthas requested a review from francoisferrand May 29, 2026 13:09
@scality scality deleted a comment from claude Bot May 29, 2026
Comment thread tests/functional/oplogPopulator/PipelineFactory.js
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from e0ce22c to bf8bafd Compare June 1, 2026 13:03
@scality scality deleted a comment from claude Bot Jun 1, 2026
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from bf8bafd to def1be2 Compare June 1, 2026 13:16
…line

The current Kafka message key projects fullDocument.value.key, which is
null on update events since BB-355 removed
change.stream.full.document=updateLookup. Every update for a given bucket
therefore serialises to the same key Struct
({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition,
breaking per-object ordering across op types (insert vs update) and
pinning a hot bucket's update traffic to a single partition (a blocker
for the oplog scaling work tracked in BB-756).

Fix: insert a $addFields stage into the MongoDB connector's
change-stream pipeline (right after $match, before the conditional
location-strip $set) that synthesises a top-level 'key' field by
$ifNull coalescing $fullDocument.value.key and
$updateDescription.updatedFields.value.key. The connector's
output.schema.key replaces the broken fullDocument nested record with a
sibling 'key' field; the existing 'ns:{coll}' projection is preserved,
so the Kafka message key remains {ns:{coll:<bucket>}, key:<object-key>}
— bucket-level isolation is unchanged. All events for the same logical
S3 object (insert, master/version updates, replication-status updates,
delete-marker updates) yield the same key value and hash to the same
partition. Master and version documents share the same value.key, so
master/version events also collapse to the same partition without
prefix-stripping.

The $ifNull variant relies on metadata always writing the whole 'value'
subdocument (full $set). Confirmed against arsenal: there is no partial
dotted-$set path for object MD today. A hypothetical future partial
$set would not populate updateDescription.updatedFields.value.key and
that event would mis-partition — accepted risk per the ticket
discussion.

The change is propagated to existing connectors via the existing
in-place PUT /connectors/{name}/config reconciliation (no recreate, no
resume-token loss — the change touches the key schema + the added
$addFields stage only, not the $match stage). Downstream oplog consumers
do not read the Kafka message key, so the new key shape is
consumer-transparent.

History / discussion:

  https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122

This supersedes the earlier SMT-based approach (a Kafka Connect Single
Message Transform deriving the key from documentKey._id with
master/version prefix-stripping), which would have spanned three repos
and added a new Java artifact + operator feature flag. Closed in favour
of this single-repo fix after a per-event-cost measurement showed the
$addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on
the mongod — not a throughput concern at our target rates.

Superseded work (to be closed):

  #2741   (SMT-track Backbeat PR)
  scality/Zenko#2410      (ZENKO-5274 — Java SMT in kafka-connect image)
  https://scality.atlassian.net/browse/ZKOP-553   (operator feature flag — no longer needed)

Issue: BB-768
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from def1be2 to 3869977 Compare June 1, 2026 13:21
@scality scality deleted a comment from claude Bot Jun 1, 2026
…sed key

Adds a Prometheus counter `s3_oplog_event_missing_key_total` (labelled by
operationType) that increments when a consumed oplog event reaches
downstream processing in ListRecordStream but lacks the top-level 'key'
field synthesised by the connector pipeline in the previous commit.

Should stay at zero in steady-state. A non-zero rate signals a regression
— e.g. a future write path on object MD that doesn't $set the whole
'value' subdocument and so bypasses the $ifNull coalesce.

Wired as a new `KafkaLogConsumerMetrics` module mirroring the
LifecycleMetrics pattern (static class, per-method try/catch +
handleError so a prometheus-side failure can't propagate into the
oplog read path).

Per-PR review feedback.

Issue: BB-768
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from 3869977 to f53649e Compare June 1, 2026 13:25
@scality scality deleted a comment from claude Bot Jun 1, 2026
@claude
Copy link
Copy Markdown

claude Bot commented Jun 1, 2026

LGTM — clean, well-tested change. The $addFields key synthesis is correctly placed in the pipeline, the Avro key schema flattening is consistent, the $ifNull coalescing handles insert/update/delete event shapes properly, and the missing-key metric provides good observability for the coupling caveat (partial dotted $set regression). Pipeline validation logic (extractBucketsFromConfig, isValid) only inspects pipeline[0], so the new stage at index 1 is transparent. Test coverage is thorough across unit and functional layers.

Review by Claude Code

@scality scality deleted a comment from claude Bot Jun 1, 2026
@delthas
Copy link
Copy Markdown
Contributor Author

delthas commented Jun 1, 2026

@francoisferrand : moving this back to 9.4 since 9.4 == 9.5 atm and not sure this warrants a different branch.

@delthas delthas changed the base branch from development/9.5 to development/9.4 June 1, 2026 14:23
@scality scality deleted a comment from bert-e Jun 1, 2026
@bert-e
Copy link
Copy Markdown
Contributor

bert-e commented Jun 1, 2026

Request integration branches

Waiting for integration branch creation to be requested by the user.

To request integration branches, please comment on this pull request with the following command:

/create_integration_branches

Alternatively, the /approve and /create_pull_requests commands will automatically
create the integration branches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants